package com.pms.web;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.baomidou.mybatisplus.mapper.Wrapper;
import com.pms.constant.MQConstant;
import com.pms.entity.WaterPumpGroupAttr;
import com.pms.service.IWaterPumpGroupAttrService;
import com.pms.util.DateUtil;
import com.pms.utils.FiledMark;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * Created by Administrator on 2018/1/8.
 */
@ServerEndpoint(value = "/websocketWaterPump")
@Component
public class WaterPumpWebSocketUtil {
    @Autowired
    IWaterPumpGroupAttrService waterPumpAttrService;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Util类中有静态方法时， @Autowired注解的bean为null.
     * 处理方法如下：
     * 调用@Autowired注解的bean：webSocketUtil1.waterPumpAttrService.xx();
     */
    public static WaterPumpWebSocketUtil waterPumpWebSocketUtil;
    @PostConstruct
    public void init() {
        waterPumpWebSocketUtil = this;
    }

    //静态变量，用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;

    //concurrent包的线程安全Set，用来存放每个客户端对应的MyWebSocket对象。
    // @Component默认是单例模式的，但springboot还是会为每个websocket连接初始化一个bean，所以可以用一个静态set保存起来。
    private static CopyOnWriteArraySet<WaterPumpWebSocketUtil> waterPumpWebSocketSet = new CopyOnWriteArraySet<WaterPumpWebSocketUtil>();

    //与某个客户端的连接会话，需要通过它来给客户端发送数据
    private Session session;

    /**
     * 连接建立成功调用的方法
     * */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
//        String BengId = session.getRequestParameterMap().get("BengId").toString();

        waterPumpWebSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1
            JSONObject messageDataJson = new JSONObject();
            messageDataJson.put("code","200");//连接关闭
            messageDataJson.put("data","建立连接成功，当前在线人数为"+ getOnlineCount());
//            sendMessage("有新连接加入！当前在线人数为" + getOnlineCount());
            sendInfo(messageDataJson.toJSONString());
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        waterPumpWebSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
            JSONObject messageDataJson = new JSONObject();
            messageDataJson.put("code","400");//连接关闭
            messageDataJson.put("data","连接已关闭，当前在线人数为"+ getOnlineCount());
            sendInfo(messageDataJson.toJSONString());
        System.out.println("有一连接关闭！当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
//        List<Map<String, Object>> waterPumpAttrList=new ArrayList<Map<String, Object>>();
//        List<String> filedNameList = null;
//        try{
//            JSONObject messageJson = JSONObject.parseObject(message);
//            String startTimeStr = messageJson.getString("startTime");//起始时间
//            Long reportTimeStart = null;
//            if(StringUtils.isNotBlank(startTimeStr)){
//                reportTimeStart = DateUtil.stringToDate(startTimeStr).getTime()/1000;
//            }
//            if(StringUtils.isBlank(startTimeStr)){// 默认查询当前时间往前2小时 数据
//                reportTimeStart = DateUtil.addDate(new Date(),11,-2).getTime()/1000;
//            }
//            String filedNamesStr = messageJson.getString("filedNames");//查询字段
//            String deviceName = messageJson.getString("deviceName");//设备名称
//            Wrapper<WaterPumpGroupAttr> waterPumpWp =new EntityWrapper<WaterPumpGroupAttr>();
//            if(StringUtils.isBlank(filedNamesStr)){
//                filedNamesStr = getWaterPumpReadFiled();
//            }
//            if(filedNamesStr.lastIndexOf(",")==filedNamesStr.length()-1){
//                filedNamesStr= filedNamesStr.substring(0,filedNamesStr.length()-1);
//            }
//            filedNamesStr = "reportTime,"+filedNamesStr;
//            waterPumpWp.setSqlSelect(filedNamesStr);//设置查询字段
//            waterPumpWp.eq("deviceName",deviceName);//根据设备名称查询
//            waterPumpWp.ge("reportTime",reportTimeStart);//根据上报时间过滤 >=
//            waterPumpAttrList = waterPumpWebSocketUtil.waterPumpAttrService.selectMaps(waterPumpWp);
//            filedNameList= Arrays.asList(filedNamesStr.split(","));
//        }catch (JSONException ex){
//
//        }catch (NullPointerException ex){
//
//        }catch (BadSqlGrammarException ex){
//
//        }
////        String sendsMessage = getSendsMessage(waterPumpAttrList);
//        String sendsMessage = formatWaterPumpPushData(waterPumpAttrList,filedNameList);
//        //群发消息
//        for (WaterPumpWebSocketUtil item : waterPumpWebSocketSet) {
//            try {
//                item.sendMessage(sendsMessage);
//            } catch (IOException e) {
//                e.printStackTrace();
//            }
//        }
        waterPumpWebSocketUtil.rabbitTemplate.convertAndSend( MQConstant.QUEUE_NAME_IOT_FAULT_SMS_PUSH,MQConstant.QUEUE_NAME_IOT_FAULT_SMS_PUSH+":"+message);
        waterPumpWebSocketUtil.rabbitTemplate.convertAndSend( MQConstant.QUEUE_NAME_IOT_FAULT_SOCKET_PUSH,MQConstant.QUEUE_NAME_IOT_FAULT_SOCKET_PUSH+":"+message);
    }

    /**
     * 发生错误时调用
     *
     * */
     @OnError
     public void onError(Session session, Throwable error) {
         System.out.println("发生错误");
         error.printStackTrace();
     }


     public void sendMessage(String message) throws IOException {
//        this.session.getBasicRemote().sendText(message);
        this.session.getAsyncRemote().sendText(message);
     }

    /**
     * 群发自定义消息
     * */
    public static void sendInfo(String message){
        for (WaterPumpWebSocketUtil item : waterPumpWebSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WaterPumpWebSocketUtil.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WaterPumpWebSocketUtil.onlineCount--;
    }

   public static CopyOnWriteArraySet<WaterPumpWebSocketUtil> getWaterPumpWebSocketSet(){
        return waterPumpWebSocketSet;
    }

    public static String getSendsMessage(Object data){
        JSONObject messageDataJson = new JSONObject();
        messageDataJson.put("code","201");//发送数据
        messageDataJson.put("data",data);
        return messageDataJson.toJSONString();
    }


    public static List<Map<String,String>> getWaterPumpFiled(){
        List<Map<String,String>> waterPumpFiledList = new ArrayList<Map<String, String>>();
        Field[] fields = WaterPumpGroupAttr.class.getDeclaredFields();
        for(Field f : fields){
            FiledMark desc = f.getAnnotation(FiledMark.class);
            if(null!=desc){
                Map<String,String> map = new HashMap<String, String>();
                map.put("filedName",f.getName());
                map.put("filedComment",desc.comment());
                waterPumpFiledList.add(map);
            }
        }
        return waterPumpFiledList;
    }

    /**
     * 字符串最后一位有,
     * @return
     */
    private static String getWaterPumpReadFiled(){
        StringBuilder readWaterPumpFiled = new StringBuilder("");
        Field[] fields = WaterPumpGroupAttr.class.getDeclaredFields();
        for(Field f : fields){
            FiledMark desc = f.getAnnotation(FiledMark.class);
            if(null!=desc){
                readWaterPumpFiled.append(f.getName()).append(",");
            }
        }
        return readWaterPumpFiled.toString();
    }
    public static List<String> getWaterPumpFiledNameList(){
        List<String> waterPumpFiledNameList = new ArrayList<String>();
        Field[] fields = WaterPumpGroupAttr.class.getDeclaredFields();
        for(Field f : fields){
            FiledMark desc = f.getAnnotation(FiledMark.class);
            if(null!=desc){
                waterPumpFiledNameList.add(f.getName());
            }
        }
        return waterPumpFiledNameList;
    }

    public static String formatWaterPumpPushData(List<Map<String, Object>> dataList){
        List<String> readFileds = getWaterPumpFiledNameList();
        readFileds.add("reportTime");
        return formatWaterPumpPushData(dataList,readFileds,null);
    }
    public static String formatWaterPumpPushData( List<Map<String, Object>> dataList,List<String> readFileds){
        return formatWaterPumpPushData(dataList,readFileds,null);
    }
    /**
     * 格式化推送数据
     * @param  dataList 返回的数据集合
     * @param  readFileds 数据拆分 key 集合
     * @return
     */
    public static String formatWaterPumpPushData( List<Map<String, Object>> dataList,List<String> readFileds,String code){
        if(readFileds==null||readFileds.isEmpty()){
            readFileds = getWaterPumpFiledNameList();
            readFileds.add("reportTime");
        }
        if(StringUtils.isBlank(code)){
            code ="201";
        }
        JSONObject messageDataJson = new JSONObject();
        if(dataList==null){
            dataList = new ArrayList<Map<String, Object>>();
        }
        if(dataList.isEmpty()){
            messageDataJson.put("data",new JSONObject());
        }
        if(!dataList.isEmpty()){
//            JSONArray dataJsonArr = new JSONArray();
            JSONObject dataJson = new JSONObject();
            for(int y=0;y<readFileds.size();y++){ //遍历查询的字段
                JSONArray filedValueArr = new JSONArray();
                for(int x=0;x<dataList.size();x++){//遍历数据
                    filedValueArr.add(dataList.get(x).get( readFileds.get(y) ));
                }
                dataJson.put(readFileds.get(y),filedValueArr);
            }
            messageDataJson.put("data",dataJson);

        }
        messageDataJson.put("code",code);
        if(code.equals("202")){
            messageDataJson.put("comment","自动推送消息");
        }
        if(code.equals("201")){
            messageDataJson.put("comment","请求返回数据");
        }
        return messageDataJson.toJSONString();
    }
}
